home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from __future__ import with_statement
- import wx
- import os.path as os
- from collections import defaultdict
- from util import traceguard, dictadd, dictsub, do, try_this, memoize
- import util.data_importer as importer
- from path import path
- from common.slotssavable import SlotsSavable
- from os.path import exists as path_exists
- from common import profile, pref
- from pprint import pformat, pprint
- from logging import getLogger
- log = getLogger('notifications')
- from wx import PyDeadObjectError
- PlaySound = wx.Sound.PlaySound
- from Queue import Queue, Empty
- from weakref import ref
-
- class cancellables(Queue):
-
- def __init__(self):
- Queue.__init__(self)
-
-
- def cancel(self):
- n = 0
-
- try:
- while True:
- item = self.get(block = False)
- if isinstance(item, ref):
- item = item()
-
- if item is not None:
-
- try:
- wx.CallAfter(item.cancel)
- except PyDeadObjectError:
- pass
- except Exception:
- print_exc = print_exc
- import traceback
- print_exc()
-
- n += 1
- continue
- except Empty:
- pass
-
- log.info('cancelled %d notification reactions', n)
-
-
-
- def fire(topic, **info):
- log.debug('fired topic %r', topic)
- topics = _topics_from_string(topic)
- types = set()
- cancels = cancellables()
- fired = False
-
- try:
- notifications = profile.notifications
- except AttributeError:
- return log.warning('no notifications yet')
-
- always_show = info.get('always_show', [])
- always_show = None if always_show is not None else set()
- always_show.update(always_fire.get(topic, []))
- if 'buddy' in info:
-
- try:
- idstr = info['buddy'].idstr()
- buddy_events = notifications.get(idstr, [])
- except:
- buddy_events = []
-
- log.debug('found %d buddy specific events', len(buddy_events))
- if buddy_events:
- fired = True
- firetopics(topics, buddy_events, types = types, cancels = cancels, **info)
-
-
- generic_events = notifications.get(None, [])
- if generic_events:
- fired = True
- firetopics(topics, generic_events, types = types, cancels = cancels, **info)
-
- if always_show:
- import gui.notificationview as nview
- G = globals()
- reactions = (set,)((lambda .0: for name in .0:
- G[name])(always_show))
- ninfo = nview.get_notification_info()
- for reaction in reactions - types:
- args = dictadd(ninfo.get(topic, { }), info)
-
- def doit(cancels = cancels, reaction = reaction, args = args):
- cancellable = reaction()(**args)
- if hasattr(cancellable, 'cancel'):
- cancels.put(ref(cancellable))
-
-
- wx.CallAfter(doit)
- fired = True
-
-
-
- try:
- on_done = info['on_done']
- except KeyError:
- pass
-
- if not fired:
- log.info('Calling on_done callback')
- traceguard.__enter__()
-
- try:
- on_done()
- finally:
- pass
-
-
- return cancels
-
-
- def firetopics(topics, events, types, cancels, **info):
- import gui.notificationview as nview
- if types is None:
- types = set()
-
- ninfo = nview.get_notification_info()
- for topic in topics:
- for event in events.get(topic, []):
- (reaction, eventinfo) = getReaction(event, topic)
- if reaction in types or reaction is None:
- continue
-
- template_info = ninfo.get(topic, { })
-
- def doit(cancels = cancels, reaction = reaction, args = dictadd(template_info, info), eventinfo = eventinfo):
- cancellable = reaction(**eventinfo)(**args)
- if hasattr(cancellable, 'cancel'):
- cancels.put(ref(cancellable))
-
-
- wx.CallAfter(doit)
- types.add(reaction)
-
-
- return types
-
-
- def getReaction(mapping, topic):
- mapping = dict(mapping)
- reaction = mapping.pop('reaction')
- if isinstance(reaction, basestring):
- reaction = globals().get(reaction, None)
-
- if reaction not in reactions_set:
- return (None, None)
-
- if reaction is Sound:
- active_soundset = active_soundset
- SoundsetException = SoundsetException
- import gui.notifications.sounds
-
- try:
- soundset = active_soundset()
- except SoundsetException:
- soundset = { }
-
-
- try:
- sound_filename = soundset[topic]
- except KeyError:
- return (None, None)
-
- mapping.update(soundfile = sound_filename)
-
- return (reaction, mapping)
-
-
- class Reaction(object):
-
- def preview(self):
- pass
-
-
- def allowed(self):
- cname = type(self).__name__.lower()
- away = profile.status.away
- if pref('notifications.enable_%s' % cname, True):
- if away:
- pass
- return bool(not pref('messaging.when_away.disable_%s' % cname, False))
-
- allowed = property(allowed)
-
-
- class Sound(Reaction):
- desc = 'Play sound %(filename)s'
-
- def __init__(self, soundfile):
- self.soundfile = soundfile
-
-
- def __call__(self, **k):
- if not self.allowed:
- return None
-
- import gui.native.helpers as helpers
- if pref('fullscreen.disable_sounds', False) and helpers.FullscreenApp():
- return None
-
- if path_exists(self.soundfile):
- PlaySound(self.soundfile)
-
-
-
- def preview(self):
- self()
-
-
- def __repr__(self):
- return '<Sound %s>' % path(self.filename).basename()
-
-
-
- class Alert(Reaction):
- desc = 'Show alert "%(msg)s"'
-
- def __init__(self, msg):
- self.msg = msg
-
-
- def __call__(self, **info):
- if not self.allowed:
- return None
-
- wx.MessageBox(self.msg)
-
-
-
- class Popup(Reaction):
-
- def desc(cls, info):
- return 'Show a popup notification'
-
- desc = classmethod(desc)
-
- def __init__(self, sticky = False):
- self.sticky = sticky
-
-
- def __call__(self, **info):
- import gui.native.helpers as helpers
- if pref('fullscreen.disable_popups', True) and helpers.FullscreenApp():
- return None
-
- if self.allowed or info.get('always_show', False):
- popup = popup
- import gui.popup
- cpy = vars(self).copy()
- cpy.update(info)
- return popup(**cpy)
-
-
-
-
- class ShowContactList(Reaction):
- DEFAULT_DURATION_SEC = 3
- desc = 'Show the contact list for %(duration)s seconds'
-
- def __init__(self, duration):
- self.duration = duration
-
-
- def __call__(self):
- print 'do buddylist stuff for', self.duration, 'sec'
-
-
-
- class LogMessage(Reaction):
-
- def __init__(self, msg):
- self.msg = msg
-
-
- def __call__(self, **info):
- if self.msg.find('%') != -1 and info.get('buddy', None) is not None:
- log.info(self.msg, info['buddy'])
- else:
- log.info(self.msg)
-
-
-
- class StartCommand(Reaction):
- desc = 'Start external command "%(path)s"'
-
- def __init__(self, path):
- self.path = path
-
-
- def __call__(self, **info):
- os.startfile(self.path)
-
-
- def preview(self):
- self()
-
-
-
- def get_notification_info(_cache = []):
-
- try:
- return _cache[0]
- except:
- pass
-
- skin = skin
- import gui
- mod = importer.yaml_import('notificationview', loadpath = [
- skin.resourcedir()])
- nots = _process_notifications_list(mod.__content__)
- _cache.append(nots)
- return nots
-
-
- def _process_notifications_list(nots_list):
- odict_from_dictlist = odict_from_dictlist
- import util
- nots = odict_from_dictlist(nots_list)
- ordered_underscores_to_dots(nots)
- update_always_fire(nots)
- return nots
-
-
- def add_notifications(nots_list):
- mynots = _process_notifications_list(nots_list)
- nots = get_notification_info()
- for k in mynots:
- nots[k] = mynots[k]
-
-
- always_fire = { }
-
- def update_always_fire(nots):
- always_fire.clear()
- for name, info in nots.iteritems():
- if 'always_show' in info:
- reactions = info.get('always_show')
- if isinstance(reactions, basestring):
- reactions = [
- reactions]
-
- always_fire[name] = reactions
- continue
-
-
-
- def ordered_underscores_to_dots(d):
- ordered_keys = d._keys[:]
- for i, key in enumerate(list(ordered_keys)):
- if key and '_' in key:
- new_key = key.replace('_', '.')
- d[new_key] = d.pop(key)
- ordered_keys[i] = new_key
- continue
-
- d._keys = ordered_keys
-
- from common.message import StatusUpdateMessage
-
- class IMWinStatusUpdate(Reaction):
-
- def __init__(self, **info):
- self.info = info
-
-
- def __call__(self, **info):
- self.info.update(info)
- on_done = info.pop('on_done', (lambda : pass))
- on_status = on_status
- import gui.imwin.imhub
- wx.CallAfter(on_status, StatusUpdateMessage(**self.info), on_done)
-
-
- reactions = [
- Popup,
- Alert,
- Sound,
- ShowContactList,
- StartCommand,
- IMWinStatusUpdate]
- if 'wxMac' in wx.PlatformInfo:
-
- class BounceDockIcon(Reaction):
-
- def __call__(self, **info):
- wx.GetApp().MacRequestUserAttention(wx.NOTIFY_REPEAT)
-
-
- reactions.extend([
- BounceDockIcon])
-
- reactions_set = set(reactions)
- default_notifications = {
- None: {
- 'contact.returnsfromidle': [],
- 'email.new': [
- {
- 'reaction': 'Popup' }],
- 'error': [
- {
- 'reaction': 'Popup' }],
- 'facebook.alert': [
- {
- 'reaction': 'Popup' }],
- 'facebook.newsfeed': [
- {
- 'reaction': 'Popup' }],
- 'filetransfer.ends': [
- {
- 'reaction': 'Popup' }],
- 'filetransfer.error': [
- {
- 'reaction': 'Popup' }],
- 'filetransfer.request': [
- {
- 'reaction': 'Popup' }],
- 'message.received.background': [
- {
- 'reaction': 'Popup' }],
- 'message.received.initial': [
- {
- 'reaction': 'Sound' }],
- 'myspace.alert': [
- {
- 'reaction': 'Popup' }] } }
- for topic in [
- 'contact.signon',
- 'contact.signoff',
- 'contact.available',
- 'contact.away',
- 'contact.returnsfromidle',
- 'contact.idle']:
- seq = default_notifications[None].setdefault(topic, [])
- seq += [
- {
- 'reaction': 'IMWinStatusUpdate' }]
-
-
- class Notification(SlotsSavable):
- pass
-
- TOPIC_SEP = '.'
-
- def _topics_from_string(topic):
- _check_topic_string(topic)
- topiclist = topic.split(TOPIC_SEP)
- topics = []([ TOPIC_SEP.join(topiclist[:x]) for x in xrange(1, len(topiclist) + 1) ])
- return list(topics)
-
- _topics_from_string = memoize(_topics_from_string)
-
- def _check_topic_string(topic):
- if not isinstance(topic, basestring):
- raise TypeError('topic must be a string')
-
- if topic.find('..') != -1:
- raise ValueError('consecutive periods not allowed in topic')
-
- if topic.startswith('.') or topic.endswith('.'):
- raise ValueError('topic cannot start or end with a topic')
-
-
-